Jay creps + LinkedIn
A typical Kafka cluster can consists of multiple brokers.
However, they use Zookeeper for maintaining their states.
You should think followers as a backup for leader and one of those followers would be chosen as leader in case of leader failure.
Producers push data to brokers.
At the time of publishing data, Producers search for elected leader (broker) of respective topic partition and automatically sends a message to that leader broker server.
Consumers reads messages from brokers.
Consumer maintains its state with the help of Zookeepers since Kafka brokers are stateless.
Database analogy
Database tables are the topics in Kafka,
Applications who are inserting data into tables are Producers and
Applications who are reading data are Consumers.
Kafka cannot work without Zookeeper. Kafka uses zookeeper for the following:
In [ ]:
**Main** takes arguments and starts producer application
object KafkaBroker extends App {
case class Coordinates(lat: Double, lon: Double)
override def main(args: Array[String]): Unit = {
// parameters
val topic = args(0) // plume_pollution
val brokers = args(1) // localhost:9092 - "broker1:port,broker2:port"
val lat = args(2).toDouble // latitude - test value: 48.85
val lon = args(3).toDouble // longitude - test value: 2.294
val sleepTime = args(4).toInt // 1000 - time between queries to API
// user 'lat' and 'lon' to create Coordinates object
val location = Coordinates(lat, lon)
startIngestion(brokers, topic, location, sleepTime)
} // end of main
startBroker creates a new Kafka Producer with specified properties
Below are 3 mandatory configuration parameter:
bootstrap.servers: This contains list of Kafka brokers address. Address is specified in terms of hostname:port. We can specify one or more broker detail, but we recommend to provide atleast 2 so if one broker goes down Producer can use other one.
key.serializer : The massage is sent to Kafka brokers in the form of key value pair. Brokers expect this kay value to be in byte arrays. So we need to tell producer which serializer class to be used to convert this key value object to byte array. This property is set to tell producer that which class to use to serialize key of message. Kafka provide us 3 inbuilt serializer class ByteArraySerializer , StringSerializer and IntegerSerializer . All this classes are present under org.apache.kafka.common.serialization package and implements Serializer interface.
value.serializer : Similar to key.serializer property but this property tells producer , which class to use to serialize value . You can implement your own serialize class and assign to this property.
/**
* Helper function to create a KafkaProducer using brokers ip and port
*
* @param brokers Broker information in the format 'localhost:9092'
* or "broker1:port,broker2:port"
*
* @return KafkaProducer[String, String]
*/
def startBroker(brokers:String): KafkaProducer[String, String] = {
// Kafka Broker properties
val props = new Properties()
props.put("bootstrap.servers", brokers)
props.put("client.id", "ScalaKafkaProducer")
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
props.put("acks", "all")
props.put("retries", new Integer(1))
// props.put("batch.size", new Integer(16384))
// props.put("linger.ms", new Integer(1))
// props.put("buffer.memory", new Integer(133554432))
// TODO: implement ProducerCallback()
new KafkaProducer[String, String](props)
} // end of startBroker
**startIngestion** has the following objectives:
/**
* Queries plume pollution API for a particular 'location' (lat, long) in an interval defined by 'sleepTime'
* and creates a KafkaProducer to ingest content
*
* @param brokers Broker information in the format 'localhost:9092'
* or "broker1:port,broker2:port"
* @param topic Topic to publish message to
* @param location Latitude and Longitude to query pollution
* @param sleepTime Time interval between queries to plume API
*
*/
def startIngestion(brokers:String, topic:String, location: Coordinates, sleepTime: Int) = {
// access plume token https://github.com/zipfian/cartesianproduct2/wiki/TOKEN
lazy val token:Option[String] = sys.env.get("PLUMETOKEN") orElse {
println("No token found. Check how to set it up at https://github.com/zipfian/cartesianproduct2/wiki/TOKEN")
None
}
while (true){
// create producer with 'props' properties
val producer = startBroker(brokers)
// query web API - response will be a String
val response = Source.fromURL(
"https://api.plume.io/1.0/pollution/forecast?token="+ token.get +"&lat="+ location.lat +"&lon="+ location.lon
).mkString
val producerRecord = new ProducerRecord[String, String](topic, response)
val recordMetadata = producer.send(producerRecord)
val meta = recordMetadata.get() // I could use this to write some tests
val msgLog =
s"""
|topic = ${meta.topic()}
|offset = ${meta.offset()}
|partition = ${meta.partition()}
""".stripMargin
println(msgLog)
producer.close()
// pause in between queries - this should be an argument
Thread.sleep(sleepTime)
} // end of infinity loop
} // end of startIngestion
} // end of KafkaBroker object
Kafka provides us Callback interface which helps in dealing with message reply irrespective of error or successful.
send(ProducerRecord, new Callback())class ProducerCallback extends Callback {
override def onCompletion(recordMetadata: RecordMedata, ex: Exception): = {
if (ex) {
// handle experienced exceptions
}
else {
// what was done in startingestion
val meta = recordMetadata.get()
val msgLog =
s"""
|topic = ${meta.topic()}
|offset = ${meta.offset()}
|partition = ${meta.partition()}
""".stripMargin
println(msgLog)
} // end of else
} // end of onCompletion
} // end of ProducerCallback class
In [ ]:
Kafka Bash commands
Links to documentation (Kafka tutorial)
Link to Scala documentation
Link to Resources
In [ ]: